/*
 * Copyright 2011 Google Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.google.walkaround.wave.shared;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.walkaround.proto.Delta;
import com.google.walkaround.proto.DocumentDiffSnapshot;
import com.google.walkaround.proto.OperationBatch;
import com.google.walkaround.proto.ProtocolDocumentOperation;
import com.google.walkaround.proto.ProtocolWaveletOperation;
import com.google.walkaround.proto.ProtocolWaveletOperation.MutateDocument;
import com.google.walkaround.proto.WalkaroundDocumentSnapshot;
import com.google.walkaround.proto.WalkaroundWaveletSnapshot;
import com.google.walkaround.proto.WaveletDiffSnapshot;
import com.google.walkaround.slob.shared.MessageException;
import com.google.walkaround.wave.shared.OperationFactory.InvalidInputException;

import org.waveprotocol.wave.model.document.operation.DocInitialization;
import org.waveprotocol.wave.model.document.operation.DocOp;
import org.waveprotocol.wave.model.document.operation.algorithm.DocOpCollector;
import org.waveprotocol.wave.model.document.operation.impl.DocOpBuilder;
import org.waveprotocol.wave.model.id.IdUtil;
import org.waveprotocol.wave.model.id.ModernIdSerialiser;
import org.waveprotocol.wave.model.id.WaveletId;
import org.waveprotocol.wave.model.id.WaveletName;
import org.waveprotocol.wave.model.operation.wave.WaveletOperation;
import org.waveprotocol.wave.model.operation.wave.WaveletOperationContext;
import org.waveprotocol.wave.model.util.CollectionUtils;
import org.waveprotocol.wave.model.util.ReadableStringMap.ProcV;
import org.waveprotocol.wave.model.util.StringMap;
import org.waveprotocol.wave.model.version.HashedVersion;
import org.waveprotocol.wave.model.wave.InvalidParticipantAddress;
import org.waveprotocol.wave.model.wave.ParticipantId;
import org.waveprotocol.wave.model.wave.data.DocumentFactory;
import org.waveprotocol.wave.model.wave.data.impl.BlipDataImpl;
import org.waveprotocol.wave.model.wave.data.impl.WaveletDataImpl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import javax.annotation.Nullable;

/**
 * Class for serializing and deserializing wavelets and deltas to and from JSON.
 * It is meant to be used both by client as well as server.
 *
 * It works in two stages. First it serializes either wavelet or delta into an
 * instance of one of protobuf-generated classes. Then it uses specific
 * server or client version of serializer to turn Message into JSON
 * representation as String. Concrete serializers (ServerWaveSerializer and
 * ClientWaveSerializer) use classes generated by PST translator in order to
 * create JSON representations. The reverse process takes place when
 * deserializing back to Wavelet or Delta object
 *
 * @author piotrkaleta@google.com (Piotr Kaleta)
 */
public class WaveSerializer {

  private static class DocDiff {
    private static final DocOp EMPTY_OP = new DocOpBuilder().build();
    private final long lastReadVersion;
    private final DocOpCollector state;
    private final DocOpCollector diff;

    DocDiff(long lastReadVersion) {
      Preconditions.checkArgument(lastReadVersion >= 0, "lastReadVersion = %s", lastReadVersion);
      this.lastReadVersion = lastReadVersion;
      this.state = new DocOpCollector();
      this.diff = new DocOpCollector();
    }

    void addOperation(String documentId, long version, DocOp docOp) {
      if (!IdUtil.isBlipId(documentId) || version < lastReadVersion) {
        state.add(docOp);
      } else {
        diff.add(docOp);
      }
    }

    /**
     * @return the state. never null.
     */
    DocOp getState() {
      DocOp op = state.composeAll();
      return op != null ? op : EMPTY_OP;
    }

    /**
     * @return the diff, or null if none
     */
    @Nullable DocOp getDiff() {
      return diff.composeAll();
    }
  }

  private static final int VERSION_INC = 1;

  private final MessageSerializer serializer;

  private DocumentFactory<?> docFactory;

  /**
   * Creates a new serializer based on a concrete implementation of either
   * client or server side serializer
   *
   * @param serializer The concrete implementation of server/client side
   *        serializer
   */
  public WaveSerializer(MessageSerializer serializer) {
    this(serializer, WaveletUtil.DEFAULT_DOC_FACTORY);
  }

  public WaveSerializer(MessageSerializer serializer, DocumentFactory<?> docFactory) {
    this.serializer = serializer;
    this.docFactory = docFactory;
  }

  public void setDocFactory(DocumentFactory<?> docFactory) {
    this.docFactory = docFactory;
  }

  public String serializeOperation(WaveletOperation op) {
    ProtocolWaveletOperation opMessage = OperationSerializer.createMessage(op);
    return serializer.serializeOp(opMessage);
  }

  public String serializeDelta(WaveletOperation operation) {
    Delta delta = MessageFactoryHelper.createDelta();
    delta.setAuthor(operation.getContext().getCreator().getAddress());
    delta.setTimestampMillis(operation.getContext().getTimestamp());
    delta.setOperation(OperationSerializer.createMessage(operation));
    return serializer.serializeDelta(delta);
  }

  public List<String> serializeDeltas(List<WaveletOperation> input) {
    ImmutableList.Builder<String> b = ImmutableList.builder();
    for (WaveletOperation op : input) {
      b.add(serializeDelta(op));
    }
    return b.build();
  }

  /**
   * The extra parameters are required because they are not present in the
   * serialized form of a wavelet operation.
   */
  public WaveletOperation deserializeOperation(ProtocolWaveletOperation message,
      ParticipantId creator, long timestamp) throws MessageException {
    try {
      return OperationFactory.createWaveletOperation(
          new WaveletOperationContext(creator, timestamp, 1), message);
    } catch (InvalidInputException e) {
      throw new MessageException(e);
    }
  }

  public WaveletOperation deserializeDelta(Delta in) throws MessageException {
    return deserializeOperation(in.getOperation(),
        ParticipantId.ofUnsafe(in.getAuthor()), in.getTimestampMillis());
  }

  public WaveletOperation deserializeDelta(String in) throws MessageException {
    return deserializeDelta(serializer.deserializeDelta(in));
  }

  /**
   * Serializes wavelet to JSON string.
   */
  public String serializeWavelet(WaveletDataImpl wavelet) {
    Preconditions.checkNotNull(wavelet.getCreator(), "Null creator");
    return serializer.serializeWavelet(createWaveletMessage(wavelet));
  }

  /**
   * Deserializes wavelet back into WaveletDataImpl.
   *
   * @param waveletName WaveletName that the deserialized wavelet will have
   */
  public WaveletDataImpl deserializeWavelet(WaveletName waveletName, String serializedSnapshot)
      throws MessageException {
    return createWaveletData(waveletName, serializer.deserializeWavelet(serializedSnapshot));
  }

  /**
   * Returns the serialized wavelet with diffs.
   */
  public String serializeWaveletDiff(WaveletDataImpl intermediateWavelet,
      WaveletDataImpl currentWavelet, StringMap<Long> lastReadVersions, List<String> mutations) {
    return serializer.serializeDiff(createWaveletDiffMessage(
        intermediateWavelet, currentWavelet, lastReadVersions, mutations));
  }

  /**
   * Deserializes the serialized wavelet in diff format into instance of
   * WaveletDataImpl.
   *
   * @param waveletName WaveletName of the resulting wavelet
   */
  public WaveletDataImpl deserializeWaveletDiff(WaveletName waveletName, String serializedDiff)
      throws MessageException {
    return createWaveletData(waveletName, serializer.deserializeDiff(serializedDiff));
  }

  /**
   * Serializes OperationBatch to JSON string.
   */
  public String serializeOperationBatch(List<WaveletOperation> input) {
    Preconditions.checkState(input.size() >= 1, "Operation batch input should have >= 1 items");
    OperationBatch batch = MessageFactoryHelper.createOperationBatch();
    for (WaveletOperation op : input) {
      batch.addOperation(OperationSerializer.createMessage(op));
    }
    return serializer.serializeOperationBatch(batch);
  }

  public List<WaveletOperation> deserializeOperationBatch(
      ParticipantId author, String serializedOperationBatch,
      long timestamp) throws MessageException {
    OperationBatch batch = serializer.deserializeOperationBatch(serializedOperationBatch);
    List<WaveletOperation> operations = new ArrayList<WaveletOperation>();
    try {
      WaveletOperationContext context =
          new WaveletOperationContext(author, timestamp, VERSION_INC);
      for (ProtocolWaveletOperation op : batch.getOperation()) {
        operations.add(OperationFactory.createWaveletOperation(context, op));
      }
      return operations;
    } catch (InvalidInputException e) {
      throw new MessageException(e);
    }
  }

  /**
   * Deserializes the diff part of the serialized wavelet into a map from
   * documentid to operation that brings the document from last read state
   * to the actual state
   */
  public StringMap<DocOp> deserializeDocumentsDiffs(WaveletDiffSnapshot diffSnapshot)
      throws MessageException {
    StringMap<DocOp> docOps = CollectionUtils.createStringMap();
    try {
      for (DocumentDiffSnapshot docDiff : diffSnapshot.getDocument()) {
        DocOp op;
        if (docDiff.getDiff() == null) {
          op = null;
        } else {
          op = OperationFactory.createDocumentOperation(docDiff.getDiff());
        }
        docOps.put(docDiff.getDocumentId(), op);
      }
      return docOps;
    } catch (InvalidInputException e) {
      throw new MessageException(e);
    }
  }


  /**
   * Creates wavelet snapshot with diff
   */
  public WaveletDiffSnapshot createWaveletDiffMessage(WaveletDataImpl intermediateWavelet,
      WaveletDataImpl currentWavelet, StringMap<Long> lastReadVersions, List<String> mutations) {
    WaveletDiffSnapshot waveletDiff = MessageFactoryHelper.createWaveletDiffSnapshot();
    waveletDiff.setWaveletId(
        ModernIdSerialiser.INSTANCE.serialiseWaveletId(currentWavelet.getWaveletId()));
    waveletDiff.addAllParticipant(listOfParticipantAddresses(currentWavelet.getParticipants()));

    waveletDiff.addAllDocument(getDocumentDiffs(
      intermediateWavelet, mutations, lastReadVersions, currentWavelet));
    waveletDiff.setVersion(currentWavelet.getVersion());
    waveletDiff.setLastModifiedTime(currentWavelet.getLastModifiedTime());
    waveletDiff.setCreator(currentWavelet.getCreator().getAddress());
    waveletDiff.setCreationTime(currentWavelet.getCreationTime());

    return waveletDiff;
  }

  /**
   * Method responsible for creating DocumentDiffSnapshot object out of a
   * mutation history of a wavelet and last read version specified in map.
   */
  private List<DocumentDiffSnapshot> getDocumentDiffs(WaveletDataImpl intermediateWavelet,
      List<String> mutations, StringMap<Long> lastReadVersions, WaveletDataImpl headWavelet) {
    try {
      StringMap<DocDiff> documentDiffMap =
          createDocumentDiffMap(intermediateWavelet, mutations, lastReadVersions);
      return createDocumentDiffList(documentDiffMap, headWavelet);
    } catch (InvalidInputException e) {
      throw new RuntimeException(e);
    } catch (MessageException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Creates a map that maps documentId's to objects representing the state of
   * the document up to the last read version (specified in map) as well as the
   * differences that were made to document after last read version.
   */
  private StringMap<DocDiff> createDocumentDiffMap(
      WaveletDataImpl intermediateWavelet, List<String> tailMutations,
      StringMap<Long> lastReadVersions)
      throws MessageException, InvalidInputException {
    StringMap<DocDiff> documentDiffMap = CollectionUtils.createStringMap();
    WaveletId waveletId = intermediateWavelet.getWaveletId();
    long intermediateVersion = intermediateWavelet.getVersion();

    for (String documentId : intermediateWavelet.getDocumentIds()) {
      Long lastReadVersion = lastReadVersions.get(documentId, 0L);
      BlipDataImpl document = intermediateWavelet.getDocument(documentId);
      DocDiff docDiff = new DocDiff(lastReadVersion);

      Preconditions.checkArgument(lastReadVersion >= intermediateVersion
          || lastReadVersion == 0 || lastReadVersion >= document.getLastModifiedVersion(),
          "intermediate wavelet %s is newer (@%s) than last read version %s of doc %s",
          waveletId, intermediateVersion, lastReadVersion, documentId);

      DocOp state = document.getContent().asOperation();
      docDiff.addOperation(documentId, 0, state);

      documentDiffMap.put(documentId, docDiff);
    }

    long version = intermediateVersion;
    for (String mutation : tailMutations) {
      ProtocolWaveletOperation op = serializer.deserializeDelta(mutation).getOperation();

      if (op.hasMutateDocument()) {
        MutateDocument mutateDocument = op.getMutateDocument();
        String documentId = mutateDocument.getDocumentId();

        DocDiff docDiff = documentDiffMap.get(documentId);
        if (docDiff == null) {
          docDiff = new DocDiff(lastReadVersions.get(documentId, 0L));
          documentDiffMap.put(documentId, docDiff);
        }

        long lastReadVersion = lastReadVersions.get(documentId, -1L);

        docDiff.addOperation(mutateDocument.getDocumentId(), version,
            OperationFactory.createDocumentOperation(mutateDocument.getDocumentOperation()));
      }

      version++;
    }

    return documentDiffMap;
  }

  /**
   * Creates a list of objects representing snapshot of documents in diff format
   * from the previously created map of documents to DocDiffs.
   */
  private List<DocumentDiffSnapshot> createDocumentDiffList(
      StringMap<DocDiff> diffSnapshotMap, final WaveletDataImpl headWavelet) {
    final List<DocumentDiffSnapshot> result = new ArrayList<DocumentDiffSnapshot>();
    diffSnapshotMap.each(new ProcV<DocDiff>() {
      @Override
      public void apply(String documentId, DocDiff diffState) {
        BlipDataImpl document = headWavelet.getDocument(documentId);
        if (document != null) {
          DocumentDiffSnapshot docDiff = MessageFactoryHelper.createDocumentDiffSnapshot();

          docDiff.setDocumentId(documentId);
          docDiff.setAuthor(document.getAuthor().getAddress());
          docDiff.addAllContributor(listOfParticipantAddresses(document.getContributors()));
          // TODO(piotrkaleta): Add contributor diffs once Walkaround supports
          // them
          docDiff.addAllAddedContributor(Collections.<String> emptyList());
          docDiff.addAllRemovedContributor(Collections.<String> emptyList());
          docDiff.setLastModifiedVersion(document.getLastModifiedVersion());
          docDiff.setLastModifiedTime(document.getLastModifiedTime());
          docDiff.setState(OperationSerializer.createMutationOp(diffState.getState()));
          DocOp diffOp = diffState.getDiff();
          if (diffOp != null) {
            docDiff.setDiff(OperationSerializer.createMutationOp(diffOp));
          }
          result.add(docDiff);
        }
      }
    });
    return result;
  }

  /**
   * First stage of serialization - creates a Protobuf Message class instance
   * from wavelet.
   */
  public WalkaroundWaveletSnapshot createWaveletMessage(WaveletDataImpl waveletData) {
    WalkaroundWaveletSnapshot wavelet = MessageFactoryHelper.createWaveletSnapshot();
    wavelet.setVersion(waveletData.getVersion());
    wavelet.setCreator(waveletData.getCreator().getAddress());
    wavelet.setCreationTime(waveletData.getCreationTime());
    wavelet.setLastModifiedTime(waveletData.getLastModifiedTime());
    wavelet.addAllDocument(listOfDocuments(waveletData));
    wavelet.addAllParticipant(listOfParticipantAddresses(waveletData.getParticipants()));
    return wavelet;
  }

  /**
   * Retrieves list of documents in a wavelet.
   */
  private List<WalkaroundDocumentSnapshot> listOfDocuments(WaveletDataImpl wavelet) {
    List<WalkaroundDocumentSnapshot> documents = new ArrayList<WalkaroundDocumentSnapshot>();
    for (String name : wavelet.getDocumentIds()) {
      BlipDataImpl blip = wavelet.getDocument(name);
      documents.add(toProtoBuf(blip));
    }
    return documents;
  }

  /**
   * Serializes blip into Message.
   */
  private WalkaroundDocumentSnapshot toProtoBuf(BlipDataImpl blip) {
    WalkaroundDocumentSnapshot document = MessageFactoryHelper.createDocumentSnapshot();

    document.setDocumentId(blip.getId());
    document.setAuthor(blip.getAuthor().getAddress());
    document.setLastModifiedTime(blip.getLastModifiedTime());
    document.setLastModifiedVersion(blip.getLastModifiedVersion());
    document.addAllContributor(listOfParticipantAddresses(blip.getContributors()));

    ProtocolDocumentOperation documentOperation =
        OperationSerializer.createMutationOp(blip.getContent().asOperation());

    document.setContent(documentOperation);

    return document;
  }

  /**
   * Turns set of participant addresses into list of string-serialized addresses.
   *
   * @param participantIds Set of ids of participants
   * @return list of serialized addresses
   */
  private List<String> listOfParticipantAddresses(Set<ParticipantId> participantIds) {
    List<String> participants = new ArrayList<String>();
    for (ParticipantId id : participantIds) {
      participants.add(id.getAddress());
    }
    return participants;
  }

  private void addParticipants(WaveletDataImpl wavelet, List<String> addresses)
      throws InvalidParticipantAddress {
    for (String address : addresses) {
      wavelet.addParticipant(ParticipantId.of(address));
    }
  }

  /**
   * Turns list of serialized participant addresses back into deserialized
   * ParticipantId's
   */
  private List<ParticipantId> listOfParticipants(List<String> addresses)
      throws InvalidParticipantAddress {
    List<ParticipantId> participants = new ArrayList<ParticipantId>();
    for (String address : addresses) {
      participants.add(ParticipantId.of(address));
    }
    return participants;
  }

  // TODO(piotrkaleta): There's a lot of duplicated code down there. This is
  // because we use diff format for conversational wavelets, whereas non-diff
  // format for user data wavelets. The best way to fix it is to make udw's also
  // use diff format with the diff field always set to null.

  /**
   * Deserializes documents from list and adds them to wavelet.
   *
   * @param wavelet Wavelet to add deserialized documents to
   * @param documents Docs to deserialize and add
   */
  private void addDocuments(
      WaveletDataImpl wavelet, List<? extends WalkaroundDocumentSnapshot> documents)
      throws InvalidParticipantAddress, InvalidInputException {
    for (WalkaroundDocumentSnapshot document : documents) {
      String docId = document.getDocumentId();
      ParticipantId author = ParticipantId.of(document.getAuthor());
      List<ParticipantId> contributors = listOfParticipants(document.getContributor());

      DocInitialization content =
          OperationFactory.createDocumentInitialization(document.getContent());

      long docLastModifiedTime = (long) document.getLastModifiedTime();
      long lastModifiedVersion = (long) document.getLastModifiedVersion();

      wavelet.createDocument(
          docId, author, contributors, content, docLastModifiedTime, lastModifiedVersion);
    }
  }

  /**
   * Deserializes documents from list and adds them to wavelet in diff format.
   *
   * @param wavelet Wavelet to add deserialized documents to
   * @param documents Docs to deserialize and add
   */
  private void addDiffDocuments(
      WaveletDataImpl wavelet, List<? extends DocumentDiffSnapshot> documents)
      throws InvalidParticipantAddress, InvalidInputException {
    for (DocumentDiffSnapshot document : documents) {
      String docId = document.getDocumentId();
      ParticipantId author = ParticipantId.of(document.getAuthor());
      List<ParticipantId> contributors = listOfParticipants(document.getContributor());

      DocInitialization content =
          OperationFactory.createDocumentInitialization(document.getState());

      long docLastModifiedTime = (long) document.getLastModifiedTime();
      long lastModifiedVersion = (long) document.getLastModifiedVersion();

      wavelet.createDocument(
          docId, author, contributors, content, docLastModifiedTime, lastModifiedVersion);
    }
  }

  /**
   * Deserializes protobuf message representing wavelet and turns it into
   * WaveletDataImpl object.
   *
   * @param waveletName WaveletName that the wavelet will have
   */
  public WaveletDataImpl createWaveletData(
      WaveletName waveletName, WalkaroundWaveletSnapshot waveletMessage)
      throws MessageException {
    try {
      ParticipantId creator = ParticipantId.of(waveletMessage.getCreator());
      long creationTime = (long) waveletMessage.getCreationTime();
      long lastModifiedTime = (long) waveletMessage.getLastModifiedTime();

      HashedVersion hashedVersion = HashedVersion.unsigned(0);
      long version = (long) waveletMessage.getVersion();

      WaveletDataImpl wavelet =
          new WaveletDataImpl(waveletName.waveletId, creator, creationTime, version, hashedVersion,
              lastModifiedTime, waveletName.waveId, docFactory);

      addParticipants(wavelet, waveletMessage.getParticipant());
      addDocuments(wavelet, waveletMessage.getDocument());

      return wavelet;
    } catch (InvalidParticipantAddress e) {
      throw new MessageException("Invalid participant address", e);
    } catch (InvalidInputException e) {
      throw new MessageException("Invalid input", e);
    }
  }

  /**
   * Deserializes protobuf message representing wavelet in diff format and turns
   * it into WaveletDataImpl object.
   *
   * @param waveletName WaveletName that the wavelet will have
   */
  public WaveletDataImpl createWaveletData(
      WaveletName waveletName, WaveletDiffSnapshot waveletMessage)
      throws MessageException {
    try {
      ParticipantId creator = ParticipantId.of(waveletMessage.getCreator());
      long creationTime = (long) waveletMessage.getCreationTime();
      long lastModifiedTime = (long) waveletMessage.getLastModifiedTime();

      HashedVersion hashedVersion = HashedVersion.unsigned(0);
      long version = (long) waveletMessage.getVersion();

      WaveletDataImpl wavelet =
          new WaveletDataImpl(waveletName.waveletId, creator, creationTime, version, hashedVersion,
              lastModifiedTime, waveletName.waveId, docFactory);

      addParticipants(wavelet, waveletMessage.getParticipant());
      addDiffDocuments(wavelet, waveletMessage.getDocument());

      return wavelet;
    } catch (InvalidParticipantAddress e) {
      throw new MessageException("Invalid participant address", e);
    } catch (InvalidInputException e) {
      throw new MessageException("Invalid input", e);
    }
  }

}
